-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
IterableStore #191
IterableStore #191
Conversation
* For other stores, the iterable should ideally be backed by a stream. | ||
*/ | ||
trait IterableStore[K, +V] extends ReadableStore[K, V] { | ||
def iterator: Future[Iterator[(K, V)]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be a Future[Spool[(K,V)]]. We don't want blocking on .next. It should be easy to convert an Iteratable to a Spool to get the map implementation.
Changed to Spool[(K, V)]. |
*/ | ||
trait IterableStore[K, V] extends ReadableStore[K, V] { | ||
|
||
protected def iteratorToSpool(it: Iterator[(K, V)]): Future[Spool[(K, V)]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can these two methods move to the companion object?
Thanks for the further feedback. Updated the diff. |
s | ||
} | ||
|
||
protected def fillSpool[K, V](it: Iterator[(K, V)], s: Promise[Spool[(K, V)]]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will actually eagerly evaluate the iterator, which I don't think you want. See this:
scala> def fillSpool[K, V](it: Iterator[(K, V)], s: Promise[Spool[(K, V)]]): Unit = {
| if (it.isEmpty) {
| s() = Return(Spool.empty[(K, V)])
| } else {
| val next = new Promise[Spool[(K, V)]]
| s() = Return(it.next *:: next)
| fillSpool(it, next)
| }
| }
fillSpool: [K, V](it: Iterator[(K, V)], s: com.twitter.util.Promise[com.twitter.concurrent.Spool[(K, V)]])Unit
scala> val s = new Promise[Spool[(Int,Int)]]
s: com.twitter.util.Promise[com.twitter.concurrent.Spool[(Int, Int)]] = Promise@793497799(state=Waiting(null,List()))
scala> val it = Seq(1,2,3,4,5).iterator.map { i => println("hi!"); (i, i*2) }
it: Iterator[(Int, Int)] = non-empty iterator
scala> fillSpool(it, s)
hi!
hi!
hi!
hi!
hi!
I think you want to lazily evaluate the iterator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just use the existing SpoolSource to create these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. We'd want lazy evaluation here. The following did not work for me either:
def iterator2Spool[K, V](it: Iterator[(K, V)]): Spool[(K, V)] = {
if (it.isEmpty) {
Spool.empty[(K, V)]
} else {
Spool.cons(it.next, Future.value(iterator2Spool(it)))
}
}
Maybe I'm missing something about Spool does deferred tails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually want a Spool here? It seems to me that a Spool is useful when you want to essentially have a Stream whose tail is evaluated not lazily, but rather asynchronously (still eagerly, but just asynchronously). I guess we could see how it's used in Finagle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, seems like we'll need to create a lazy version of Spool (doable I think), or maybe use Stream[Future[(K, V)]] instead.
@johnynek thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this more, it might be best to provide an api that lets you provide a batchSize, and just return an Iterable[Future[(K, V)]]. The other possibility would be to supply a new Cursor abstraction, which lets you specify a batch size.
class Cursor[A] {
def fetch(batchSize: Int): Future[Seq[A]]
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thoughts:
- lazy or bust.
- I think a type like:
trait FStream[+T] {
def head: Future[T]
def tail: FStream[T]
}
is what we want (I think a StreamT monad transformer on Future is what is happening here, I think scalaz has this). I thought this was basically Spool.
-
If you have Iterable[Future[T]] you can do. .toList.size which implies you can get the size without waiting, which is false. We don't know the cardinality of the keys without an Await.
-
I think we should stay concrete while we abstract: what will we implement? I would like to use this paging to handle things like a ReadableStore[K, FStream[V]] where K is a user, and V are the followers: since some users have millions of followers, we can't assume we can fit the response in on unpaged value. What other use cases do people have?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Are you going to permit people to do out of order requests? What will you do if I do
// assume stream: FStream[A]
val tail = stream.tail
tail.tail.head
tail.head
? Does the iterable abstraction really support that?
-
I think batching is pretty nice. You might be able to refactor FStream to support batching by specifying a headsize with your tail.
-
.toList.size would behave the same as it usually does with infinite iterables (ie it would never end). The semantics would definitely be weird, but the right way for it to work would be to just start sending Future.exceptions after a certain point. I don't think this is necessarily the right way--you might want to convert it to a Spool if someone tries to do a "foreach"-style operation so that it will be correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey gang: Spool is supposed to have deferred behavior, but we found a few bugs in the *::
syntax that were forcing the tail. I'll see if I can "force" a util-core
release including the fix. #rimshot
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, @evnm released util-core 6.11.0, which includes the Spool fix: twitter/util@12faddb
@mosesn +10 for a Cursor interface. I'm waiting for this to go to trait Cursor[S, A] {
def fetch(state: S): Future[Seq[A]]
} or even just a function alias defined in the iterable store companion object type Cursor[S, A] = (S => Future[Seq[A]]) I'm probably getting a head of myself by I thought it would be worth bringing up as other types of iterable ( by value or by key) stores may have the same requirements. |
Thanks for the ideas. My initial thinking was that IterableStore can be used for iterating over key-values, keys, etc. and a separate Paging store can be used for paginated reads. However, now I realize Iterable is just a special case of Paging store (i.e. batch size 1). Given this, I think a cursor makes sense and can be applied across these different use cases. To me, it seems like the fetch call would also need to return an updated cursor to be used for the subsequent fetch. Something like: trait Cursor[S, A] {
def fetch(state: S): Future[(S, Seq[A])]
} (Let me know if you guys think there's a better way to capture the updated cursor.) For stores like redis, S could be an integer representing the cursor in Does this adequately cover the use cases we foresee as of now? |
This was something that occurred to me when looking at the instrumentation branch. The *Store interfaces are general enough to be remixed into combinator features so that store combinators can be implemented in terms of other stores. trait Cursor[S,A] extends ReadableStore[S, Seq[A]]
Want the next page? cursor.get(state) Want to fetch multiple pages at once? cursor.multiGet(states) |
It would actually be this, right? trait Cursor[S,A] extends ReadableStore[S, (S,Seq[A])] And yes, that feels right to me - it's similar to a pivot combinator... |
ah yes. a client would need access to |
+1 |
+1 @avibryant for my edification, do you have any links to more information on the pivot combinator? |
I was referring to https://github.com/twitter/storehaus/blob/develop/storehaus-core/src/main/scala/com/twitter/storehaus/PivotedReadableStore.scala ... not exactly the same thing, but similar in that it is transforming the key space in a pretty significant way. |
Sounds good. One remaining point.. how do we denote start state (or empty cursor) for the first get? One way is to force Option[S] and have None denote the start state. Alternatively, we can let the implementation define it. abstract class CursorState[S] {
def currentState: S
def startState: S
}
trait Cursor[S,A] extends ReadableStore[CursorState[S], (CursorState[S],Seq[A])] I have no strong preference, but the latter seems more flexible? Sorry for the churn. This did not occur to me earlier. |
We could require a zero type class at construction time.
|
*/ | ||
trait IterableStore[K, V] extends ReadableStore[K, V] { | ||
|
||
def items: Future[Spool[(K, V)]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thought. entries
may be a better name for this. From a user perspective Stores are a lot like java.util.Maps
in which the equiv name would be entries
. I totally get items
though for those you are from a python background and have a dict in mind when they think of stores.
If you wanted a split between pagable stores and iterable stores, for the pagable flavor you may not need the first type arg to be any more structured/complex than an abstract type, here's a rough sketch case class MySQLCursor[T](query: MySqlQuery[T]) extends Cursor[(Long, Long), T] {
def get(state: (Long, Long)) = {
val (offset, limit) = state
query.offset(offset).limit(limit).fetch().map { values =>
((offset + limit, limit), values)
}
}
}
val cursor = MySQLCursor(mysqlQuery)
def all(from: (Long, Long)) = cursor.get(from).map {
case (next, values) = values ++ all(next)
}
// read 100 at a time
all((0, 100)) iterable stores would need more structure like an initial state, and an end state to indicate there are no more results. A type class for this would probably be best |
This looks a little like the State monad actually. |
@mosesn if we pull too hard on that we will see that all of storehaus is just kleisli composition on the Future[Option[_]] monad, and these generalizations are just changing the monad. The read thing is T => M[U] where there is a Monad[M]. That said, I think there is value in making these combinators a bit more concrete for people |
+1 @johnynek I think if you look hard enough everything reveals it self to be some form of monad :) The judgment of whether or not to make that explicit is up to the man behind the curtain. Sometimes the audience will give just as much applause during the show with or without that knowledge. |
@johnynek lol ok |
I don't think that an independent Cursor type is necessary. A "pageable" store is still iterable, you're just iterating over batches of entries. One approach that would be somewhat symmetrical to the WriteableStore.{put, multiPut} split would be:
Such that the per-item Spool can be defined in terms of either batches or individual entries. |
@softprops @avibryant On second thought, it might be clearer to make the trait for "Splittable" different from the trait for "Iterable"... but I still don't think an explicit "Cursor" type is necessary. The sketch above (#191 (comment)) is sufficient for Iterable, and is easily implemented in memory by any hash map. But there are extra considerations in the Splittable case that make it more difficult for an implementer:
So a potential Splittable trait might be:
|
Man, so much to catch up on lately.
If there's a way to design this so that
This is starting to sound a lot like the As a design goal, it would be nice to think of storehaus as a small set of primitives with a small surface area interface and sets of combinators for adding behavior while producing new stores which maintain the primitives interface so that the creation of stores can easily be a small matter of composition. I understand not all square pegs fit in round holes so I can see some leeway (preferable small), but it would something nice to keep in mind. As a user of an instance of a Store, I should mainly only have to think about the get/put operations + the |
I was thinking it was just a trait without a self-type, similar to WritableStore. But yea, I don't see any reason why it couldn't self-type Store.
Maybe, but I think the
Yea, that's a good point. So the Cursor type does need to be a generic parameter to Splittable.
|
I agree with @softprops that we should try and have IterableStore itself as a store. My 2 cents building on the ideas proposed so far.. Store approach: trait IterableStore[S,A] extends ReadableStore[S, (S,Seq[A])]
// S is the cursor type
// A can be K or V or (K, V) depending on what you want to iterate over
trait SplittableStore[R, K, V] extends ReadableStore[R, ReadableStore[K, V]] // or Spool[(K, V)]
// R is the range, can be a cursor pair, or something else if applicable
// get(r: R) will be analogous to subStore or subMap Or, combinator approach: class IterableStore[S, K, V, A](store: ReadableStore[K, V])
(implicit inj: Injection[(K, V), A])
extends ReadableStore[S, (S, Seq[A])]
class SplittableStore[R, K, V](store: ReadableStore[K, V])
extends ReadableStore[R, ReadableStore[K, V]) // or Spool[(K, V)] |
Argh, sorry. Dumb UI / dumb user. |
@rubanm I don't think requiring a public Cursor type in IterableStore is very helpful; it:
|
... but +1 for the proposed SplittableStore. |
@stuhood Hmm valid points. So we let the stores maintain cursor state. I think this also adequately covers the paging case as long as you are not looking for random reads. For random reads (like mysql range or limit/offset queries etc), we can use a SplittableStore. Alternatively, there is a QueryableStore in the works here #205 I've updated the pull req. Also, thanks for the Spool fix. |
def fromMap[K, V](m: Map[K, V]): IterableStore[K, V] = new MapStore(m) | ||
|
||
/** Helper method to convert Iterator to Spool. */ | ||
def iteratorToSpool[K, V](it: Iterator[(K, V)]): Future[Spool[(K, V)]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be possible to do this without the fillSpool method:
def iteratorToSpool[K, V](it: Iterator[(K, V)]): Spool[(K, V)] =
if (it.hasNext)
it.next *:: Future.value(iteratorToSpool(it))
else
Spool.empty
Thanks, looks good! Might be worthwhile to implement IterableStore on a few of storehaus' included Stores, just to make sure we're not missing anything. Or I'd be happy to do that in a separate pull? EDIT: Whoops: missed the MapStore update in there. Looks good with the exception of dropping fillSpool. |
@@ -23,6 +23,9 @@ import com.twitter.util.Future | |||
* @author Oscar Boykin | |||
* @author Sam Ritchie | |||
*/ | |||
class MapStore[K, +V](val backingStore: Map[K, V] = Map[K, V]()) extends ReadableStore[K, V] { | |||
class MapStore[K, V](val backingStore: Map[K, V] = Map[K, V]()) extends ReadableStore[K, V] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did we lose +V
?
Fixed variance and dropped fillSpool. Thanks. |
Addresses #48
I'm thinking it may be better to not mix in
scala.collection.Iterable
, so we can wrap the store's iterator in a Future.